home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
pyxmpp
/
streambase.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
20KB
|
751 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
__revision__ = '$Id: streambase.py 652 2006-08-27 19:41:15Z jajcus $'
__docformat__ = 'restructuredtext en'
import libxml2
import socket
import os
import time
import random
import threading
import errno
import logging
from pyxmpp import xmlextra
from pyxmpp.expdict import ExpiringDictionary
from pyxmpp.utils import to_utf8
from pyxmpp.stanza import Stanza
from pyxmpp.error import StreamErrorNode
from pyxmpp.iq import Iq
from pyxmpp.presence import Presence
from pyxmpp.message import Message
from pyxmpp.jid import JID
from pyxmpp import resolver
from pyxmpp.stanzaprocessor import StanzaProcessor
from pyxmpp.exceptions import StreamError, StreamEncryptionRequired, HostMismatch, ProtocolError
from pyxmpp.exceptions import FatalStreamError, StreamParseError, StreamAuthenticationError
STREAM_NS = 'http://etherx.jabber.org/streams'
BIND_NS = 'urn:ietf:params:xml:ns:xmpp-bind'
def stanza_factory(xmlnode, stream = None):
if xmlnode.name == 'iq':
return Iq(xmlnode, stream = stream)
if xmlnode.name == 'message':
return Message(xmlnode, stream = stream)
if xmlnode.name == 'presence':
return Presence(xmlnode, stream = stream)
else:
return Stanza(xmlnode, stream = stream)
class StreamBase(StanzaProcessor, xmlextra.StreamHandler):
def __init__(self, default_ns, extra_ns = (), keepalive = 0, owner = None):
StanzaProcessor.__init__(self)
xmlextra.StreamHandler.__init__(self)
self.default_ns_uri = default_ns
if extra_ns:
self.extra_ns_uris = extra_ns
else:
self.extra_ns_uris = []
self.keepalive = keepalive
self._reader_lock = threading.Lock()
self.process_all_stanzas = False
self.port = None
self._reset()
self.owner = owner
self._StreamBase__logger = logging.getLogger('pyxmpp.Stream')
def _reset(self):
self.doc_in = None
self.doc_out = None
self.socket = None
self._reader = None
self.addr = None
self.default_ns = None
self.extra_ns = { }
self.stream_ns = None
self._reader = None
self.ioreader = None
self.me = None
self.peer = None
self.skip = False
self.stream_id = None
self._iq_response_handlers = ExpiringDictionary()
self._iq_get_handlers = { }
self._iq_set_handlers = { }
self._message_handlers = []
self._presence_handlers = []
self.eof = False
self.initiator = None
self.features = None
self.authenticated = False
self.peer_authenticated = False
self.auth_method_used = None
self.version = None
self.last_keepalive = False
def _connect_socket(self, sock, to = None):
self.eof = 0
self.socket = sock
if to:
self.peer = JID(to)
else:
self.peer = None
self.initiator = 1
self._send_stream_start()
self._make_reader()
def connect(self, addr, port, service = None, to = None):
self.lock.acquire()
try:
return self._connect(addr, port, service, to)
finally:
self.lock.release()
def _connect(self, addr, port, service = None, to = None):
if to is None:
to = str(addr)
if service is not None:
self.state_change('resolving srv', (addr, service))
addrs = resolver.resolve_srv(addr, service)
if not addrs:
addrs = [
(addr, port)]
else:
addrs = [
(addr, port)]
msg = None
for addr, port in addrs:
if type(addr) in (str, unicode):
self.state_change('resolving', addr)
s = None
for res in resolver.getaddrinfo(addr, port, 0, socket.SOCK_STREAM):
(family, socktype, proto, _unused, sockaddr) = res
try:
s = socket.socket(family, socktype, proto)
self.state_change('connecting', sockaddr)
s.connect(sockaddr)
self.state_change('connected', sockaddr)
except socket.error:
msg = None
self._StreamBase__logger.debug('Connect to %r failed' % (sockaddr,))
if s:
s.close()
s = None
continue
continue
break
if s:
break
continue
if not s:
if msg:
raise socket.error, msg
else:
raise FatalStreamError, 'Cannot connect'
self.addr = addr
self.port = port
self._connect_socket(s, to)
self.last_keepalive = time.time()
def accept(self, sock, myname):
self.lock.acquire()
try:
return self._accept(sock, myname)
finally:
self.lock.release()
def _accept(self, sock, myname):
self.eof = 0
(self.socket, addr) = sock.accept()
self._StreamBase__logger.debug('Connection from: %r' % (addr,))
(self.addr, self.port) = addr
if myname:
self.me = JID(myname)
else:
self.me = None
self.initiator = 0
self._make_reader()
self.last_keepalive = time.time()
def disconnect(self):
self.lock.acquire()
try:
return self._disconnect()
finally:
self.lock.release()
def _disconnect(self):
if self.doc_out:
self._send_stream_end()
def _post_connect(self):
pass
def _post_auth(self):
pass
def state_change(self, state, arg):
self._StreamBase__logger.debug('State: %s: %r' % (state, arg))
def close(self):
self.lock.acquire()
try:
return self._close()
finally:
self.lock.release()
def _close(self):
self._disconnect()
if self.doc_in:
self.doc_in = None
if self.features:
self.features = None
self._reader = None
self.stream_id = None
if self.socket:
self.socket.close()
self._reset()
def _make_reader(self):
self._reader = xmlextra.StreamReader(self)
def stream_start(self, doc):
self.doc_in = doc
self._StreamBase__logger.debug('input document: %r' % (self.doc_in.serialize(),))
try:
r = self.doc_in.getRootElement()
if r.ns().getContent() != STREAM_NS:
self._send_stream_error('invalid-namespace')
raise FatalStreamError, 'Invalid namespace.'
except libxml2.treeError:
self._send_stream_error('invalid-namespace')
raise FatalStreamError, "Couldn't get the namespace."
self.version = r.prop('version')
if self.version and self.version != '1.0':
self._send_stream_error('unsupported-version')
raise FatalStreamError, 'Unsupported protocol version.'
to_from_mismatch = 0
if self.initiator:
self.stream_id = r.prop('id')
peer = r.prop('from')
if peer:
peer = JID(peer)
if self.peer:
if peer and peer != self.peer:
self._StreamBase__logger.debug('peer hostname mismatch: %r != %r' % (peer, self.peer))
to_from_mismatch = 1
else:
self.peer = peer
else:
to = r.prop('to')
if to:
to = self.check_to(to)
if not to:
self._send_stream_error('host-unknown')
raise FatalStreamError, 'Bad "to"'
self.me = JID(to)
self._send_stream_start(self.generate_id())
self._send_stream_features()
self.state_change('fully connected', self.peer)
self._post_connect()
if not self.version:
self.state_change('fully connected', self.peer)
self._post_connect()
if to_from_mismatch:
raise HostMismatch
def stream_end(self, _unused):
self._StreamBase__logger.debug('Stream ended')
self.eof = 1
if self.doc_out:
self._send_stream_end()
if self.doc_in:
self.doc_in = None
self._reader = None
if self.features:
self.features = None
self.state_change('disconnected', self.peer)
def stanza_start(self, doc, node):
pass
def stanza(self, _unused, node):
self._process_node(node)
def error(self, descr):
raise StreamParseError, descr
def _send_stream_end(self):
self.doc_out.getRootElement().addContent(' ')
s = self.doc_out.getRootElement().serialize(encoding = 'UTF-8')
end = s.rindex('<')
try:
self._write_raw(s[end:])
except (IOError, SystemError, socket.error):
e = None
self._StreamBase__logger.debug('Sending stream closing tag failed:' + str(e))
self.doc_out.freeDoc()
self.doc_out = None
if self.features:
self.features = None
def _send_stream_start(self, sid = None):
if self.doc_out:
raise StreamError, 'Stream start already sent'
self.doc_out = libxml2.newDoc('1.0')
root = self.doc_out.newChild(None, 'stream', None)
self.stream_ns = root.newNs(STREAM_NS, 'stream')
root.setNs(self.stream_ns)
self.default_ns = root.newNs(self.default_ns_uri, None)
for prefix, uri in self.extra_ns:
self.extra_ns[uri] = root.newNs(uri, prefix)
if self.peer and self.initiator:
root.setProp('to', self.peer.as_utf8())
if self.me and not (self.initiator):
root.setProp('from', self.me.as_utf8())
root.setProp('version', '1.0')
if sid:
root.setProp('id', sid)
self.stream_id = sid
sr = self.doc_out.serialize(encoding = 'UTF-8')
self._write_raw(sr[:sr.find('/>')] + '>')
def _send_stream_error(self, condition):
if not self.doc_out:
self._send_stream_start()
e = StreamErrorNode(condition)
e.xmlnode.setNs(self.stream_ns)
self._write_raw(e.serialize())
e.free()
self._send_stream_end()
def _restart_stream(self):
self._reader = None
self.doc_out = None
self.doc_in = None
self.features = None
if self.initiator:
self._send_stream_start(self.stream_id)
self._make_reader()
def _make_stream_features(self):
root = self.doc_out.getRootElement()
features = root.newChild(root.ns(), 'features', None)
return features
def _send_stream_features(self):
self.features = self._make_stream_features()
self._write_raw(self.features.serialize(encoding = 'UTF-8'))
def write_raw(self, data):
self.lock.acquire()
try:
return self._write_raw(data)
finally:
self.lock.release()
def _write_raw(self, data):
logging.getLogger('pyxmpp.Stream.out').debug('OUT: %r', data)
try:
self.socket.send(data)
except (IOError, OSError, socket.error):
e = None
raise FatalStreamError('IO Error: ' + str(e))
def _write_node(self, xmlnode):
if self.eof and not (self.socket) or not (self.doc_out):
self._StreamBase__logger.debug('Dropping stanza: %r' % (xmlnode,))
return None
xmlnode = xmlnode.docCopyNode(self.doc_out, 1)
self.doc_out.addChild(xmlnode)
try:
ns = xmlnode.ns()
except libxml2.treeError:
ns = None
if ns and ns.content == xmlextra.COMMON_NS:
xmlextra.replace_ns(xmlnode, ns, self.default_ns)
s = xmlextra.safe_serialize(xmlnode)
self._write_raw(s)
xmlnode.unlinkNode()
xmlnode.freeNode()
def send(self, stanza):
self.lock.acquire()
try:
return self._send(stanza)
finally:
self.lock.release()
def _send(self, stanza):
if not self.version:
try:
err = stanza.get_error()
except ProtocolError:
err = None
if err:
err.downgrade()
self.fix_out_stanza(stanza)
self._write_node(stanza.xmlnode)
def idle(self):
self.lock.acquire()
try:
return self._idle()
finally:
self.lock.release()
def _idle(self):
self._iq_response_handlers.expire()
if not (self.socket) or self.eof:
return None
now = time.time()
if self.keepalive and now - self.last_keepalive >= self.keepalive:
self._write_raw(' ')
self.last_keepalive = now
def fileno(self):
self.lock.acquire()
try:
return self.socket.fileno()
finally:
self.lock.release()
def loop(self, timeout):
self.lock.acquire()
try:
while not (self.eof) and self.socket is not None:
act = self._loop_iter(timeout)
if not act:
self._idle()
continue
finally:
self.lock.release()
def loop_iter(self, timeout):
self.lock.acquire()
try:
return self._loop_iter(timeout)
finally:
self.lock.release()
def _loop_iter(self, timeout):
import select as select
self.lock.release()
try:
(ifd, _unused, efd) = select.select([
self.socket], [], [
self.socket], timeout)
except select.error:
e = None
if e.args[0] != errno.EINTR:
raise
ifd = []
_unused = []
efd = []
finally:
self.lock.acquire()
if self.socket in ifd or self.socket in efd:
self._process()
return True
else:
return False
def process(self):
self.lock.acquire()
try:
self._process()
finally:
self.lock.release()
def _process(self):
try:
try:
self._read()
except (xmlextra.error,):
e = None
self._StreamBase__logger.exception('Exception during read()')
raise StreamParseError(unicode(e))
except:
raise
except (IOError, OSError, socket.error):
e = None
self.close()
raise FatalStreamError('IO Error: ' + str(e))
except (FatalStreamError, KeyboardInterrupt, SystemExit):
e = None
self.close()
raise
def _read(self):
self._StreamBase__logger.debug('StreamBase._read(), socket: %r', self.socket)
if self.eof:
return None
try:
r = self.socket.recv(1024)
except socket.error:
e = None
if e.args[0] != errno.EINTR:
raise
return None
self._feed_reader(r)
def _feed_reader(self, data):
logging.getLogger('pyxmpp.Stream.in').debug('IN: %r', data)
if data:
try:
r = self._reader.feed(data)
while r:
r = self._reader.feed('')
if r is None:
self.eof = 1
self.disconnect()
except StreamParseError:
self._send_stream_error('xml-not-well-formed')
raise
except:
None<EXCEPTION MATCH>StreamParseError
None<EXCEPTION MATCH>StreamParseError
self.eof = 1
self.disconnect()
if self.eof:
self.stream_end(None)
def _process_node(self, xmlnode):
ns_uri = xmlnode.ns().getContent()
if ns_uri == 'http://etherx.jabber.org/streams':
self._process_stream_node(xmlnode)
return None
if ns_uri == self.default_ns_uri:
stanza = stanza_factory(xmlnode, self)
self.lock.release()
try:
self.process_stanza(stanza)
finally:
self.lock.acquire()
stanza.free()
else:
self._StreamBase__logger.debug('Unhandled node: %r' % (xmlnode.serialize(),))
def _process_stream_node(self, xmlnode):
if xmlnode.name == 'error':
e = StreamErrorNode(xmlnode)
self.lock.release()
try:
self.process_stream_error(e)
finally:
self.lock.acquire()
e.free()
return None
elif xmlnode.name == 'features':
self._StreamBase__logger.debug('Got stream features')
self._StreamBase__logger.debug('Node: %r' % (xmlnode,))
self.features = xmlnode.copyNode(1)
self.doc_in.addChild(self.features)
self._got_features()
return None
self._StreamBase__logger.debug('Unhandled stream node: %r' % (xmlnode.serialize(),))
def process_stream_error(self, err):
self._StreamBase__logger.debug('Unhandled stream error: condition: %s %r' % (err.get_condition().name, err.serialize()))
def check_to(self, to):
if to != self.me:
return None
return to
def generate_id(self):
return '%i-%i-%s' % (os.getpid(), time.time(), str(random.random())[2:])
def _got_features(self):
ctxt = self.doc_in.xpathNewContext()
ctxt.setContextNode(self.features)
ctxt.xpathRegisterNs('stream', STREAM_NS)
ctxt.xpathRegisterNs('bind', BIND_NS)
bind_n = None
try:
bind_n = ctxt.xpathEval('bind:bind')
finally:
ctxt.xpathFreeContext()
if self.authenticated:
if bind_n:
self.bind(self.me.resource)
else:
self.state_change('authorized', self.me)
def bind(self, resource):
iq = Iq(stanza_type = 'set')
q = iq.new_query(BIND_NS, u'bind')
if resource:
q.newTextChild(None, 'resource', to_utf8(resource))
self.state_change('binding', resource)
self.set_response_handlers(iq, self._bind_success, self._bind_error)
self.send(iq)
iq.free()
def _bind_success(self, stanza):
jid_n = stanza.xpath_eval('bind:bind/bind:jid', {
'bind': BIND_NS })
if jid_n:
self.me = JID(jid_n[0].getContent().decode('utf-8'))
self.state_change('authorized', self.me)
def _bind_error(self, stanza):
raise FatalStreamError, 'Resource binding failed'
def connected(self):
if self.doc_in and self.doc_out and not (self.eof):
return True
else:
return False